Skip to content

Commit

Permalink
Fix: asoctl deadlock (#4475)
Browse files Browse the repository at this point in the history
* Create watchdog to shut down when done

* Refactor concurrency to fix deadlock

* Add conc library

* Address PR feedback

* Address PR comments
  • Loading branch information
theunrepentantgeek authored Dec 5, 2024
1 parent 32c6c25 commit 3e3f11a
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 121 deletions.
1 change: 1 addition & 0 deletions v2/cmd/asoctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions v2/cmd/asoctl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWR
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand Down
288 changes: 167 additions & 121 deletions v2/cmd/asoctl/pkg/importresources/resource_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/go-logr/logr"
"github.com/rotisserie/eris"
"github.com/sourcegraph/conc"
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/runtime"

"github.com/Azure/azure-service-operator/v2/cmd/asoctl/pkg/importreporter"
Expand Down Expand Up @@ -81,116 +83,149 @@ func (ri *ResourceImporter) Import(
ctx context.Context,
done chan struct{},
) (*Result, error) {
workersRequired := ri.desiredWorkers()

candidates := make(chan ImportableResource) // candidates that need to be deduped
pending := make(chan ImportableResource) // importers that are pending import
successes := make(chan ImportResourceResult) // importers that have been executed successfully
failures := make(chan ImportError) // errors from importers that failed
completions := make(chan struct{}) // channel to signal completion
resources := make(chan ImportableResource) // resources to import
allstages := conc.NewWaitGroup()

// Dedupe candidates so we import each distinct resource only once
go ri.queueUniqueImporters(candidates, pending, ri.reporter)
watchdog := ri.startWatchdog(resources)

// Create workers to run the import
for i := 0; i < workersRequired; i++ {
go ri.importWorker(ctx, pending, successes, failures, ri.reporter, completions)
}
pendingResources := ri.startDeduplicator(resources, watchdog, allstages)
successes, failures := ri.startWorkers(ctx, pendingResources, allstages)

// Collate the results
report := newResourceImportReport()
go ri.collateResults(successes, candidates, ri.reporter, report, completions)
go ri.collateErrors(failures, report, completions)
ri.startCollationOfResults(successes, resources, watchdog, allstages, report)
ri.startCollationOfErrors(failures, watchdog, allstages, report)

// Set up by adding our initial resources; these will be completed when we collate their results
for _, rsrc := range ri.resources {
candidates <- rsrc
resources <- rsrc
ri.reporter.AddPending(1)
}

// Wait while everything runs
<-done
allstages.Wait()

// Check for an abort, and return the error if so
if ctx.Err() != nil {
ri.log.Error(ctx.Err(), "Cancelling import.")
return nil, ctx.Err()
}

// Close channels so final reporting and other cleanup occurs
close(candidates)
close(pending)
close(successes)

// Wait for everything to finish
for i := 0; i < workersRequired+2; i++ {
<-completions
}

// Get the summary report and write it
report.WriteToLog(ri.log)

// Now we've imported everything, return the resources
// We do this even if there's an error so that we can return partial results
resources := make([]ImportedResource, 0, len(ri.imported))
for _, imported := range ri.imported {
resources = append(resources, imported)
}

return &Result{
imported: resources,
imported: maps.Values(ri.imported),
}, nil
}

// startWatchdog starts a watchdog goroutine that will initiate shutdown once all imports are complete.
// We keep track of the number of inflight imports, and once that reaches zero, we close the channel.
func (ri *ResourceImporter) startWatchdog(
resources chan ImportableResource,
) *watchdog {
watchdog := newWatchdog(func() {
close(resources)
})

return watchdog
}

// startDeduplicator starts a deduplicator goroutine that will ensure each distinct resource is only imported once,
// regardless of how many times we encounter it. We also proactively buffer the pending channel to avoid blocking.
// Any fixed size channel would risk deadlock for a sufficiently large resource graph.
// resources is the channel of resources to deduplicate.
// watchdog is updated to track the number of inflight operations. We increase this every time we see a unique new resource.
// progress is used to report on progress as we go.
// Returns a new channel that will contain only unique resources.
// queueUniqueImporters reads from the candidates channel, putting each importer onto pending exactly once.
// This ensures each distinct resource is only imported once, regardless of how many times we
// encounter it. We also proactively buffer the pending channel to avoid blocking. Any fixed size channel
// would risk deadlock for a sufficiently large resource graph.
func (ri *ResourceImporter) queueUniqueImporters(
candidates <-chan ImportableResource,
pending chan<- ImportableResource,
progress importreporter.Interface,
) {
// This ensures each distinct resource is only imported once,
func (ri *ResourceImporter) startDeduplicator(
resources <-chan ImportableResource,
watchdog *watchdog,
waitgroup *conc.WaitGroup,
) chan ImportableResource {
seen := set.Make[string]()
var queue []ImportableResource
var current ImportableResource = nil

running := true
for running {
// Dequeue from our internal buffer if needed
if current == nil && len(queue) > 0 {
current = queue[0]
queue = queue[1:]
}
uniqueResources := make(chan ImportableResource)

// If we have a current importable to send, use the pending queue
var upstream chan<- ImportableResource = nil
if current != nil {
upstream = pending
}
// Close the channel when we're done, so that workers shut down too
defer close(uniqueResources)

select {
case rsrc, ok := <-candidates:
if !ok {
// Channel closed
running = false
} else if seen.Contains(rsrc.ID()) {
// We've already seen this resource (we've already queued it for import)
// So remove it from our count of work to be done
ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID())
progress.AddPending(-1)
} else {
// Remember we've seen this, and add it to our queue
seen.Add(rsrc.ID())
queue = append(queue, rsrc)
ri.log.V(2).Info("Buffering import", "resource", rsrc.ID())
waitgroup.Go(func() {
run:
for {
// Dequeue from our internal buffer if needed
if current == nil && len(queue) > 0 {
current = queue[0]
queue = queue[1:]
}

// If we have a current importable to send, use the pending queue
var upstream chan<- ImportableResource = nil
if current != nil {
upstream = uniqueResources
}

select {
case rsrc, ok := <-resources:
if !ok {
// Channel closed
break run
} else if seen.Contains(rsrc.ID()) {
// We've already seen this resource (we've already queued it for import)
// So remove it from our count of work to be done
ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID())
ri.reporter.AddPending(-1)
} else {
// Remember we've seen this, and add it to our queue
watchdog.starting()
seen.Add(rsrc.ID())
queue = append(queue, rsrc)
ri.log.V(2).Info("Buffering import", "resource", rsrc.ID())
}
case upstream <- current:
// We've sent the current importable, so clear it
ri.log.V(2).Info("Queued import", "resource", current.ID())
current = nil
}
case upstream <- current:
// We've sent the current importable, so clear it
ri.log.V(2).Info("Queued import", "resource", current.ID())
current = nil
}
})

return uniqueResources
}

// startWorkers starts the worker goroutines that will import resources.
// ctx is used to check for cancellation.
// resources is the channel of unique resources to import.
// waitgroup is used to track the number of inflight goroutines.
// returns two channels: one for successful imports, and one for errors.
func (ri *ResourceImporter) startWorkers(
ctx context.Context,
resources <-chan ImportableResource,
waitgroup *conc.WaitGroup,
) (chan ImportResourceResult, chan ImportError) {
successes := make(chan ImportResourceResult) // importers that have been executed successfully
failures := make(chan ImportError) // errors from importers that failed

wg := conc.NewWaitGroup()
workersRequired := ri.desiredWorkers()
for i := 0; i < workersRequired; i++ {
wg.Go(func() {
ri.importWorker(ctx, resources, successes, failures, ri.reporter)
})
}

// Once all the workers are done, close the channels
waitgroup.Go(func() {
wg.Wait()
close(successes)
close(failures)
})

return successes, failures
}

// importerWorker is a goroutine for importing resources.
Expand All @@ -200,14 +235,12 @@ func (ri *ResourceImporter) queueUniqueImporters(
// pending is a source of resources to import.
// completed is where we send the result of a successful import.
// failed is where we send the error from a failed import.
// done is a channel we signal when we're finished.
func (ri *ResourceImporter) importWorker(
ctx context.Context,
pending <-chan ImportableResource,
completed chan<- ImportResourceResult,
failed chan<- ImportError,
progress importreporter.Interface,
done chan<- struct{},
) {
for rsrc := range pending {
if ctx.Err() != nil {
Expand All @@ -225,68 +258,81 @@ func (ri *ResourceImporter) importWorker(
completed <- imported
}
}

done <- struct{}{}
}

func (ri *ResourceImporter) collateResults(
// startCollationOfResults starts a goroutine that will collate the results of imports.
// completed is the channel of completed imports to drain.
// candidates is the channel that receives any new resources to import.
// watchdog is updated to track the number of inflight operations.
// waitgroup is used to track running goroutines.
// report is the report to write to.
func (ri *ResourceImporter) startCollationOfResults(
completed <-chan ImportResourceResult, // completed imports for us to collate
candidates chan<- ImportableResource, // additional candidates for importing
progress importreporter.Interface, // importreporter tracking
watchdog *watchdog,
waitgroup *conc.WaitGroup,
report *resourceImportReport, // report to write to
done chan<- struct{}, // channel to signal completion
) {
for importResult := range completed {
rsrc := importResult.resource
gk := rsrc.GroupKind()

// Enqueue any child resources we found; these will be marked as completed when we collate their results
progress.AddPending(len(importResult.pending))
for _, p := range importResult.pending {
candidates <- p
}

ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())
waitgroup.Go(func() {
for importResult := range completed {
rsrc := importResult.resource
gk := rsrc.GroupKind()

// Enqueue any child resources we found; these will be marked as completed when we collate their results
ri.reporter.AddPending(len(importResult.pending))
for _, p := range importResult.pending {
candidates <- p
}

report.AddSuccessfulImport(gk)
ri.imported[rsrc.ID()] = rsrc
ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

// Flag the main resource as complete
// We do this after everything else because it might indicate we're finished
progress.Completed(1)
}
report.AddSuccessfulImport(gk)
ri.imported[rsrc.ID()] = rsrc

done <- struct{}{}
ri.completed(watchdog)
}
})
}

func (ri *ResourceImporter) collateErrors(
// startCollationOfErrors starts a goroutine that will collage all the errors that occurred during import.
func (ri *ResourceImporter) startCollationOfErrors(
failures <-chan ImportError,
watchdog *watchdog,
waitgroup *conc.WaitGroup,
report *resourceImportReport,
done chan<- struct{},
) {
for ie := range failures {
var skipped *SkippedError
if eris.As(ie.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", ie.gk,
"name", ie.name,
"because", skipped.Because)
report.AddSkippedImport(ie.gk, skipped.Because)
} else {
ri.log.Error(ie.err,
"Failed",
"kind", ie.gk,
"name", ie.name)
waitgroup.Go(func() {
for ie := range failures {
var skipped *SkippedError
if eris.As(ie.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", ie.gk,
"name", ie.name,
"because", skipped.Because)
report.AddSkippedImport(ie.gk, skipped.Because)
} else {
ri.log.Error(ie.err,
"Failed",
"kind", ie.gk,
"name", ie.name)

report.AddFailedImport(ie.gk, ie.err.Error())
}

report.AddFailedImport(ie.gk, ie.err.Error())
ri.completed(watchdog)
}
}
})
}

done <- struct{}{}
// completed is used to indicate a resource has been fully processed.
// We do this after everything else because it might indicate we're completed the entire process.
func (ri *ResourceImporter) completed(watchdog *watchdog) {
ri.reporter.Completed(1)
watchdog.stopped()
}

func (ri *ResourceImporter) importResource(
Expand Down
Loading

0 comments on commit 3e3f11a

Please sign in to comment.