diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 284cc422125..48a4ef9b9d1 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1470,11 +1470,18 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(announcement.msg) + annJobID, err := validationBarrier.InitJobDependencies( + announcement.msg, + ) + if err != nil { + announcement.err <- err + continue + } d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, validationBarrier, + announcement, &announcements, + validationBarrier, annJobID, ) // The trickle timer has ticked, which indicates we should @@ -1525,7 +1532,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) { + deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier, + jobID graph.JobID) { defer d.wg.Done() defer vb.CompleteJob() @@ -1536,7 +1544,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. - err := vb.WaitForDependants(nMsg.msg) + err := vb.WaitForParents(jobID, nMsg.msg) if err != nil { log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) @@ -1566,7 +1574,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message had any dependencies, then we can now signal them to // continue. - vb.SignalDependants(nMsg.msg, allow) + vb.SignalDependents(nMsg.msg, jobID) // If the announcement was accepted, then add the emitted announcements // to our announce batch to be broadcast once the trickle timer ticks diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index 3cbe950ee3b..1cd07ae5985 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -3,29 +3,34 @@ package graph import ( "fmt" "sync" + "sync/atomic" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) -// validationSignals contains two signals which allows the ValidationBarrier to -// communicate back to the caller whether a dependent should be processed or not -// based on whether its parent was successfully validated. Only one of these -// signals is to be used at a time. -type validationSignals struct { - // allow is the signal used to allow a dependent to be processed. - allow chan struct{} +// JobID identifies an active job in the validation barrier. It is large so +// that we don't need to worry about overflows. +type JobID uint64 - // deny is the signal used to prevent a dependent from being processed. - deny chan struct{} +// jobInfo stores job dependency info for a set of dependent gossip messages. +type jobInfo struct { + // activeParentJobIDs is the set of active parent job ids. + activeParentJobIDs fn.Set[JobID] + + // activeDependentJobs is the set of active dependent job ids. + activeDependentJobs fn.Set[JobID] } -// ValidationBarrier is a barrier used to ensure proper validation order while -// concurrently validating new announcements for channel edges, and the -// attributes of channel edges. It uses this set of maps (protected by this -// mutex) to track validation dependencies. For a given channel our -// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must -// validate the item on the left of the arrow before that on the right. +// ValidationBarrier is a barrier used to enforce a strict validation order +// while concurrently validating other updates for channel edges. It uses a set +// of maps to track validation dependencies. This is needed in practice because +// gossip messages for a given channel may arive in order, but then due to +// scheduling in different goroutines, may be validated in the wrong order. +// With the ValidationBarrier, the dependent update will wait until the parent +// update completes. type ValidationBarrier struct { // validationSemaphore is a channel of structs which is used as a // semaphore. Initially we'll fill this with a buffered channel of the @@ -33,23 +38,25 @@ type ValidationBarrier struct { // from this channel, then restore the value upon completion. validationSemaphore chan struct{} - // chanAnnFinSignal is map that keep track of all the pending - // ChannelAnnouncement like validation job going on. Once the job has - // been completed, the channel will be closed unblocking any - // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals + // jobIDMap stores the set of job ids for each channel. + // NOTE: This MUST be used with the mutex. + // NOTE: We don't need to worry about collisions between + // lnire.ShortChannelID and route.Vertex because they are of different + // length and entries therefore cannot hash to the same keys. + jobInfoMap map[any]*jobInfo + + // jobDependencies is a mapping from a child's JobID to the set of + // parent JobID that it depends on. + // NOTE: This MUST be used with the mutex. + jobDependencies map[JobID]fn.Set[JobID] - // chanEdgeDependencies tracks any channel edge updates which should - // wait until the completion of the ChannelAnnouncement before - // proceeding. This is a dependency, as we can't validate the update - // before we validate the announcement which creates the channel - // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals + // childJobChans stores the notification channel that each child job + // listens on for parent job completions. + // NOTE: This MUST be used with the mutex. + childJobChans map[JobID]chan struct{} - // nodeAnnDependencies tracks any pending NodeAnnouncement validation - // jobs which should wait until the completion of the - // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]*validationSignals + // idCtr is an atomic integer that is used to assign JobIDs. + idCtr atomic.Uint64 quit chan struct{} sync.Mutex @@ -62,10 +69,10 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), - nodeAnnDependencies: make(map[route.Vertex]*validationSignals), - quit: quitChan, + jobInfoMap: make(map[any]*jobInfo), + jobDependencies: make(map[JobID]fn.Set[JobID]), + childJobChans: make(map[JobID]chan struct{}), + quit: quitChan, } // We'll first initialize a set of semaphores to limit our concurrency @@ -92,7 +99,9 @@ func (v *ValidationBarrier) FetchJobSlot() { // InitJobDependencies will wait for a new job slot to become open, and then // sets up any dependent signals/trigger for the new job -func (v *ValidationBarrier) InitJobDependencies(job interface{}) { +func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID, + error) { + // We'll wait for either a new slot to become open, or for the quit // channel to be closed. select { @@ -106,47 +115,100 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { // Once a slot is open, we'll examine the message of the job, to see if // there need to be any dependent barriers set up. switch msg := job.(type) { - - // If this is a channel announcement, then we'll need to set up den - // tenancies, as we'll need to verify this before we verify any - // ChannelUpdates for the same channel, or NodeAnnouncements of nodes - // that are involved in this channel. This goes for both the wire - // type,s and also the types that we use within the database. case *lnwire.ChannelAnnouncement1: + id := JobID(v.idCtr.Add(1)) - // We ensure that we only create a new announcement signal iff, - // one doesn't already exist, as there may be duplicate - // announcements. We'll close this signal once the - // ChannelAnnouncement has been validated. This will result in - // all the dependent jobs being unlocked so they can finish - // execution themselves. - if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok { - // We'll create the channel that we close after we - // validate this announcement. All dependants will - // point to this same channel, so they'll be unblocked - // at the same time. - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[msg.ShortChannelID] = signals - v.chanEdgeDependencies[msg.ShortChannelID] = signals + v.updateOrCreateJobInfo(msg.ShortChannelID, id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID1), id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID2), id) - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals - } + return id, nil - // These other types don't have any dependants, so no further - // initialization needs to be done beyond just occupying a job slot. + // Populate the dependency mappings for the below child jobs. case *lnwire.ChannelUpdate1: - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, msg.ShortChannelID) + + return childJobID, nil case *lnwire.NodeAnnouncement: - // TODO(roasbeef): node ann needs to wait on existing channel updates - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, route.Vertex(msg.NodeID)) + + return childJobID, nil case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - return + // - We can do the above by calling populateDependencies. For + // now, while we evaluate potential side effects, don't do + // anything with childJobID and just return it. + childJobID := JobID(v.idCtr.Add(1)) + return childJobID, nil + + default: + // An invalid message was passed into InitJobDependencies. + // Return an error. + return JobID(0), errors.New("invalid message") + } +} + +// updateOrCreateJobInfo modifies the set of activeParentJobs for this annID +// and updates jobInfoMap. This must only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) updateOrCreateJobInfo(annID any, annJobID JobID) { + info, ok := v.jobInfoMap[annID] + if ok { + // If an entry already exists for annID, then a job related to + // it is being validated. Add to the set of parent job ids. + // This addition will only affect _later_, _child_ jobs for the + // annID. + info.activeParentJobIDs.Add(annJobID) + } else { + // No entry exists for annID, meaning that we should create + // one. + parentJobSet := fn.NewSet(annJobID) + + info := &jobInfo{ + activeParentJobIDs: parentJobSet, + activeDependentJobs: fn.NewSet[JobID](), + } + v.jobInfoMap[annID] = info + } +} + +// populateDependencies populates the job dependency mappings (i.e. which jobs +// should complete after another) for the (childJobID, annID) tuple. This must +// only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) populateDependencies(childJobID JobID, + annID any) { + + // If there is no entry in the jobInfoMap, we don't have to wait on any + // parent jobs to finish. + info, ok := v.jobInfoMap[annID] + if ok { + // We want to see a snapshot of active parent jobs for this + // annID that are already registered in activeParentJobIDs. The + // child job identified by childJobID can only run after these + // parent jobs have run. After grabbing the snapshot, we then + // want to persist a slice of these jobs. + + // Create the notification chan that parent jobs will send (or + // close) on when they complete. + jobChan := make(chan struct{}) + + // Add to set of activeDependentJobs for this annID. + info.activeDependentJobs.Add(childJobID) + + // Store in childJobChans. The parent jobs will fetch this chan + // to notify on. The child job will later fetch this chan to + // listen on when WaitForParents is called. + v.childJobChans[childJobID] = jobChan + + // Copy over the parent job IDs at this moment for this annID. + // This job must be processed AFTER these parent IDs. + parentJobs := info.activeParentJobIDs.Union(fn.NewSet[JobID]()) + + // Populate the jobDependencies mapping. + v.jobDependencies[childJobID] = parentJobs } } @@ -161,16 +223,21 @@ func (v *ValidationBarrier) CompleteJob() { } } -// WaitForDependants will block until any jobs that this job dependants on have -// finished executing. This allows us a graceful way to schedule goroutines -// based on any pending uncompleted dependent jobs. If this job doesn't have an -// active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) error { +// WaitForParents will block until all parent job dependencies have went +// through the validation pipeline. This allows us a graceful way to run jobs +// in goroutines and still have strict ordering guarantees. If this job doesn't +// have any validation dependencies, then this function will return +// immediately. +func (v *ValidationBarrier) WaitForParents(childJobID JobID, + job interface{}) error { var ( - signals *validationSignals ok bool jobDesc string + + parentJobIDs fn.Set[JobID] + annID any + jobChan chan struct{} ) // Acquire a lock to read ValidationBarrier. @@ -180,16 +247,22 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. case *lnwire.ChannelUpdate1: - signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] + annID = msg.ShortChannelID + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", msg.ShortChannelID.ToUint64()) case *lnwire.NodeAnnouncement: - vertex := route.Vertex(msg.NodeID) - signals, ok = v.nodeAnnDependencies[vertex] + annID = route.Vertex(msg.NodeID) + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", - vertex) + route.Vertex(msg.NodeID)) // Other types of jobs can be executed immediately, so we'll just // return directly. @@ -210,58 +283,176 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { log.Debugf("Waiting for dependent on %s", jobDesc) - // If we do have an active job, then we'll wait until either the signal - // is closed, or the set of jobs exits. - select { - case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") - - case <-signals.deny: - log.Debugf("Signal deny for %s", jobDesc) - return NewErrf(ErrParentValidationFailed, - "parent validation failed") + v.Lock() + jobChan, ok = v.childJobChans[childJobID] + if !ok { + v.Unlock() - case <-signals.allow: - log.Tracef("Signal allow for %s", jobDesc) + // The entry may not exist because this job does not depend on + // any parent jobs. return nil } + v.Unlock() + + for { + select { + case <-v.quit: + return NewErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") + + case <-jobChan: + // Every time this is sent on or if it's closed, a + // parent job has finished. The parent jobs have to + // also potentially close the channel because if all + // the parent jobs finish and call SignalDependents + // before the goroutine running WaitForParents has a + // chance to grab the notification chan from + // childJobChans, then the running goroutine will wait + // here for a notification forever. By having the last + // parent job close the notificiation chan, we avoid + // this issue. + + // Check and see if we have any parent jobs left. If we + // don't, we can finish up. + v.Lock() + info, found := v.jobInfoMap[annID] + if !found { + v.Unlock() + + // No parent job info found, proceed with + // validation. + return nil + } + + x := parentJobIDs.Intersect(info.activeParentJobIDs) + v.Unlock() + if x.Empty() { + // The parent jobs have all completed. We can + // proceed with validation. + return nil + } + + // If we've reached this point, we are still waiting on + // a parent job to complete. + } + } } -// SignalDependants will allow/deny any jobs that are dependent on this job that -// they can continue execution. If the job doesn't have any dependants, then -// this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { +// SignalDependents signals to any child jobs that this parent job has +// finished. +func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error { v.Lock() defer v.Unlock() switch msg := job.(type) { - - // If we've just finished executing a ChannelAnnouncement, then we'll - // close out the signal, and remove the signal from the map of active - // ones. This will allow/deny any dependent jobs to continue execution. case *lnwire.ChannelAnnouncement1: - finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, msg.ShortChannelID) + // Signal to the child jobs that parent validation has + // finished. We have to call removeParentJob for each annID + // that this ChannelAnnouncement can be associated with. + err := v.removeParentJob(msg.ShortChannelID, id) + if err != nil { + return err + } + + err = v.removeParentJob(route.Vertex(msg.NodeID1), id) + if err != nil { + return err + } + + err = v.removeParentJob(route.Vertex(msg.NodeID2), id) + if err != nil { + return err } - delete(v.chanEdgeDependencies, msg.ShortChannelID) + return nil - // For all other job types, we'll delete the tracking entries from the - // map, as if we reach this point, then all dependants have already - // finished executing and we can proceed. case *lnwire.NodeAnnouncement: - delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) + // Remove child job info. + v.removeChildJob(route.Vertex(msg.NodeID), id) + return nil case *lnwire.ChannelUpdate1: - delete(v.chanEdgeDependencies, msg.ShortChannelID) + // Remove child job info. + v.removeChildJob(msg.ShortChannelID, id) + return nil case *lnwire.AnnounceSignatures1: - return + // No dependency mappings are stored for AnnounceSignatures1, + // so do nothing. + return nil + } + + return errors.New("invalid message - no job dependencies") +} + +// removeParentJob removes parentJobID from the set of active parent jobs and +// notifies to annID's child jobs that it has finished validating. This must be +// called from SignalDependents. +// NOTE: MUST be called with a mutex. +func (v *ValidationBarrier) removeParentJob(annID any, + parentJobID JobID) error { + + // Remove the parentJobID from activeParentJobIDs. + jobInfo, found := v.jobInfoMap[annID] + if !found { + // NOTE: Something seriously wrong has happened. + return fmt.Errorf("no job info found for identifier(%v)", + annID) + } + + jobInfo.activeParentJobIDs.Remove(parentJobID) + + lastJob := jobInfo.activeParentJobIDs.Empty() + + // Notify all dependent jobs that a parent job has completed. + for child := range jobInfo.activeDependentJobs { + notifyChan, ok := v.childJobChans[child] + if !ok { + // Note: Something seriously wrong has happened. + return fmt.Errorf("no child job chan for child "+ + "JobID(%v)", child) + } + + // We don't want to block when sending out the signal. + select { + case notifyChan <- struct{}{}: + default: + } + + // If this is the last parent job for this annID, also close + // the channel. This is needed because it's possible that the + // parent job cleans up the job mappings before the goroutine + // handling the child job has a chance to call + // WaitForParents and catch the signal sent above. We are + // allowed to close because no other parent job will be able to + // send along the channel (or close) as we're removing the + // entry from the jobInfoMap below. + if lastJob { + close(notifyChan) + } } + + // Remove from jobInfoMap if last job. + if lastJob { + delete(v.jobInfoMap, annID) + } + + return nil +} + +// removeChildJob removes childJobID from the set of dependent jobs for annID +// and cleans up its job dependency mappings. This MUST be called from +// SignalDependents. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) removeChildJob(annID any, childJobID JobID) { + // Check jobInfoMap and remove this job from activeDependentJobs. + info, ok := v.jobInfoMap[annID] + if ok { + info.activeDependentJobs.Remove(childJobID) + } + + // Remove the notification chan from childJobChans. + delete(v.childJobChans, childJobID) + + // Remove this job's dependency mapping. + delete(v.jobDependencies, childJobID) } diff --git a/graph/validation_barrier_test.go b/graph/validation_barrier_test.go index 38fc7a0870c..c717f937aef 100644 --- a/graph/validation_barrier_test.go +++ b/graph/validation_barrier_test.go @@ -7,6 +7,7 @@ import ( "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" ) // TestValidationBarrierSemaphore checks basic properties of the validation @@ -74,23 +75,31 @@ func TestValidationBarrierQuit(t *testing.T) { // Create a set of unique channel announcements that we will prep for // validation. anns := make([]*lnwire.ChannelAnnouncement1, 0, numTasks) + parentJobIDs := make([]graph.JobID, 0, numTasks) for i := 0; i < numTasks; i++ { anns = append(anns, &lnwire.ChannelAnnouncement1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), NodeID1: nodeIDFromInt(uint64(2 * i)), NodeID2: nodeIDFromInt(uint64(2*i + 1)), }) - barrier.InitJobDependencies(anns[i]) + parentJobID, err := barrier.InitJobDependencies(anns[i]) + require.NoError(t, err) + + parentJobIDs = append(parentJobIDs, parentJobID) } // Create a set of channel updates, that must wait until their // associated channel announcement has been verified. chanUpds := make([]*lnwire.ChannelUpdate1, 0, numTasks) + childJobIDs := make([]graph.JobID, 0, numTasks) for i := 0; i < numTasks; i++ { chanUpds = append(chanUpds, &lnwire.ChannelUpdate1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), }) - barrier.InitJobDependencies(chanUpds[i]) + childJob, err := barrier.InitJobDependencies(chanUpds[i]) + require.NoError(t, err) + + childJobIDs = append(childJobIDs, childJob) } // Spawn additional tasks that will send the error returned after @@ -100,7 +109,9 @@ func TestValidationBarrierQuit(t *testing.T) { jobErrs := make(chan error) for i := 0; i < numTasks; i++ { go func(ii int) { - jobErrs <- barrier.WaitForDependants(chanUpds[ii]) + jobErrs <- barrier.WaitForParents( + childJobIDs[ii], chanUpds[ii], + ) }(i) } @@ -120,11 +131,19 @@ func TestValidationBarrierQuit(t *testing.T) { // Signal completion for the first half of tasks, but only allow // dependents to be processed as well for the second quarter. case i < numTasks/4: - barrier.SignalDependants(anns[i], false) + err := barrier.SignalDependents( + anns[i], parentJobIDs[i], + ) + require.NoError(t, err) + barrier.CompleteJob() case i < numTasks/2: - barrier.SignalDependants(anns[i], true) + err := barrier.SignalDependents( + anns[i], parentJobIDs[i], + ) + require.NoError(t, err) + barrier.CompleteJob() // At midpoint, quit the validation barrier. @@ -141,9 +160,7 @@ func TestValidationBarrierQuit(t *testing.T) { switch { // First half should return without failure. - case i < numTasks/4 && !graph.IsError( - err, graph.ErrParentValidationFailed, - ): + case i < numTasks/4 && err != nil: t.Fatalf("unexpected failure while waiting: %v", err) case i >= numTasks/4 && i < numTasks/2 && err != nil: @@ -159,6 +176,128 @@ func TestValidationBarrierQuit(t *testing.T) { } } +// TestValidationBarrierParentJobsClear tests that creating two parent jobs for +// ChannelUpdate / NodeAnnouncement will pause child jobs until the set of +// parent jobs has cleared. +func TestValidationBarrierParentJobsClear(t *testing.T) { + t.Parallel() + + const ( + numTasks = 8 + timeout = time.Second + ) + + quit := make(chan struct{}) + barrier := graph.NewValidationBarrier(numTasks, quit) + + sharedScid := lnwire.NewShortChanIDFromInt(0) + sharedNodeID := nodeIDFromInt(0) + + // Create a set of gossip messages that depend on each other. ann1 and + // ann2 share the ShortChannelID field. ann1 and ann3 share both the + // ShortChannelID field and the NodeID1 field. These shared values let + // us test the "set" properties of the ValidationBarrier. + ann1 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(1), + } + parentID1, err := barrier.InitJobDependencies(ann1) + require.NoError(t, err) + + ann2 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: nodeIDFromInt(2), + NodeID2: nodeIDFromInt(3), + } + parentID2, err := barrier.InitJobDependencies(ann2) + require.NoError(t, err) + + ann3 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(10), + } + parentID3, err := barrier.InitJobDependencies(ann3) + require.NoError(t, err) + + // Create the ChannelUpdate & NodeAnnouncement messages. + upd1 := &lnwire.ChannelUpdate1{ + ShortChannelID: sharedScid, + } + childID1, err := barrier.InitJobDependencies(upd1) + require.NoError(t, err) + + node1 := &lnwire.NodeAnnouncement{ + NodeID: sharedNodeID, + } + childID2, err := barrier.InitJobDependencies(node1) + require.NoError(t, err) + + run := func(vb *graph.ValidationBarrier, childJobID graph.JobID, + job interface{}, resp chan error, start chan error) { + + close(start) + + err := vb.WaitForParents(childJobID, job) + resp <- err + } + + errChan := make(chan error, 2) + + startChan1 := make(chan error, 1) + startChan2 := make(chan error, 1) + + go run(barrier, childID1, upd1, errChan, startChan1) + go run(barrier, childID2, node1, errChan, startChan2) + + // Wait for the start signal since we are testing the case where the + // parent jobs only complete _after_ the child jobs have called. Note + // that there is technically an edge case where we receive the start + // signal and call SignalDependents before WaitForParents can actually + // be called in the goroutine launched above. In this case, which + // arises due to our inability to control precisely when these VB + // methods are scheduled (as they are in different goroutines), the + // test should still pass as we want to test that validation jobs are + // completing and not stalling. In other words, this issue with the + // test itself is good as it actually randomizes some of the ordering, + // occasionally. This tests that the VB is robust against ordering / + // concurrency issues. + select { + case <-startChan1: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan1") + } + + select { + case <-startChan2: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan2") + } + + // Now we can call SignalDependents for our parent jobs. + err = barrier.SignalDependents(ann1, parentID1) + require.NoError(t, err) + + err = barrier.SignalDependents(ann2, parentID2) + require.NoError(t, err) + + err = barrier.SignalDependents(ann3, parentID3) + require.NoError(t, err) + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for first error signal") + } + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for second error signal") + } +} + // nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes. func nodeIDFromInt(i uint64) [33]byte { var nodeID [33]byte