Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: asoctl deadlock #4475

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions v2/cmd/asoctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
theunrepentantgeek marked this conversation as resolved.
Show resolved Hide resolved
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions v2/cmd/asoctl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWR
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand Down
286 changes: 165 additions & 121 deletions v2/cmd/asoctl/pkg/importresources/resource_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/go-logr/logr"
"github.com/rotisserie/eris"
"github.com/sourcegraph/conc"
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/runtime"

"github.com/Azure/azure-service-operator/v2/cmd/asoctl/pkg/importreporter"
Expand Down Expand Up @@ -81,116 +83,149 @@ func (ri *ResourceImporter) Import(
ctx context.Context,
done chan struct{},
) (*Result, error) {
workersRequired := ri.desiredWorkers()

candidates := make(chan ImportableResource) // candidates that need to be deduped
pending := make(chan ImportableResource) // importers that are pending import
successes := make(chan ImportResourceResult) // importers that have been executed successfully
failures := make(chan ImportError) // errors from importers that failed
completions := make(chan struct{}) // channel to signal completion
resources := make(chan ImportableResource) // resources to import
theunrepentantgeek marked this conversation as resolved.
Show resolved Hide resolved
allstages := conc.NewWaitGroup()

// Dedupe candidates so we import each distinct resource only once
go ri.queueUniqueImporters(candidates, pending, ri.reporter)
watchdog := ri.startWatchdog(resources)

// Create workers to run the import
for i := 0; i < workersRequired; i++ {
go ri.importWorker(ctx, pending, successes, failures, ri.reporter, completions)
}
pendingResources := ri.startDeduplicator(resources, watchdog, allstages)
successes, failures := ri.startWorkers(ctx, pendingResources, allstages)

// Collate the results
report := newResourceImportReport()
go ri.collateResults(successes, candidates, ri.reporter, report, completions)
go ri.collateErrors(failures, report, completions)
ri.startCollationOfResults(successes, resources, watchdog, allstages, report)
ri.startCollationOfErrors(failures, watchdog, allstages, report)

// Set up by adding our initial resources; these will be completed when we collate their results
for _, rsrc := range ri.resources {
candidates <- rsrc
resources <- rsrc
ri.reporter.AddPending(1)
}

// Wait while everything runs
<-done
allstages.Wait()

// Check for an abort, and return the error if so
if ctx.Err() != nil {
ri.log.Error(ctx.Err(), "Cancelling import.")
return nil, ctx.Err()
}

// Close channels so final reporting and other cleanup occurs
close(candidates)
close(pending)
close(successes)

// Wait for everything to finish
for i := 0; i < workersRequired+2; i++ {
<-completions
}

// Get the summary report and write it
report.WriteToLog(ri.log)

// Now we've imported everything, return the resources
// We do this even if there's an error so that we can return partial results
resources := make([]ImportedResource, 0, len(ri.imported))
for _, imported := range ri.imported {
resources = append(resources, imported)
}

return &Result{
imported: resources,
imported: maps.Values(ri.imported),
}, nil
}

// startWatchdog starts a watchdog goroutine that will initiate shutdown once all imports are complete.
// We keep track of the number of inflight imports, and once that reaches zero, we close the channel.
func (ri *ResourceImporter) startWatchdog(
resources chan ImportableResource,
) *watchdog {
watchdog := newWatchdog(func() {
close(resources)
})

return watchdog
}

// startDeduplicator starts a deduplicator goroutine that will ensure each distinct resource is only imported once,
// regardless of how many times we encounter it. We also proactively buffer the pending channel to avoid blocking.
// Any fixed size channel would risk deadlock for a sufficiently large resource graph.
// resources is the channel of resources to deduplicate.
// watchdog is updated to track the number of inflight operations. We increase this every time we see a unique new resource.
// progress is used to report on progress as we go.
// Returns a new channel that will contain only unique resources.
// queueUniqueImporters reads from the candidates channel, putting each importer onto pending exactly once.
// This ensures each distinct resource is only imported once, regardless of how many times we
// encounter it. We also proactively buffer the pending channel to avoid blocking. Any fixed size channel
// would risk deadlock for a sufficiently large resource graph.
func (ri *ResourceImporter) queueUniqueImporters(
candidates <-chan ImportableResource,
pending chan<- ImportableResource,
progress importreporter.Interface,
) {
// This ensures each distinct resource is only imported once,
func (ri *ResourceImporter) startDeduplicator(
resources <-chan ImportableResource,
watchdog *watchdog,
waitgroup *conc.WaitGroup,
) chan ImportableResource {
seen := set.Make[string]()
var queue []ImportableResource
var current ImportableResource = nil

running := true
for running {
// Dequeue from our internal buffer if needed
if current == nil && len(queue) > 0 {
current = queue[0]
queue = queue[1:]
}
uniqueResources := make(chan ImportableResource)

// If we have a current importable to send, use the pending queue
var upstream chan<- ImportableResource = nil
if current != nil {
upstream = pending
}
waitgroup.Go(func() {
running := true
for running {
// Dequeue from our internal buffer if needed
if current == nil && len(queue) > 0 {
current = queue[0]
queue = queue[1:]
}

select {
case rsrc, ok := <-candidates:
if !ok {
// Channel closed
running = false
} else if seen.Contains(rsrc.ID()) {
// We've already seen this resource (we've already queued it for import)
// So remove it from our count of work to be done
ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID())
progress.AddPending(-1)
} else {
// Remember we've seen this, and add it to our queue
seen.Add(rsrc.ID())
queue = append(queue, rsrc)
ri.log.V(2).Info("Buffering import", "resource", rsrc.ID())
// If we have a current importable to send, use the pending queue
var upstream chan<- ImportableResource = nil
if current != nil {
upstream = uniqueResources
}

select {
case rsrc, ok := <-resources:
if !ok {
// Channel closed
running = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be clearer to label the for loop and use a labelled break here (assuming I haven't missed some other stuff that needs to happen after the select).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if seen.Contains(rsrc.ID()) {
// We've already seen this resource (we've already queued it for import)
// So remove it from our count of work to be done
ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID())
ri.reporter.AddPending(-1)
} else {
// Remember we've seen this, and add it to our queue
watchdog.starting()
seen.Add(rsrc.ID())
queue = append(queue, rsrc)
ri.log.V(2).Info("Buffering import", "resource", rsrc.ID())
}
case upstream <- current:
// We've sent the current importable, so clear it
ri.log.V(2).Info("Queued import", "resource", current.ID())
current = nil
}
case upstream <- current:
// We've sent the current importable, so clear it
ri.log.V(2).Info("Queued import", "resource", current.ID())
current = nil
}

// Close the channel when we're done, so that workers shut down too
close(uniqueResources)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer this at the top of the goroutine func to ensure it's closed even in the case of panics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

})

return uniqueResources
}

// startWorkers starts the worker goroutines that will import resources.
// ctx is used to check for cancellation.
// resources is the channel of unique resources to import.
// waitgroup is used to track the number of inflight goroutines.
// returns two channels: one for successful imports, and one for errors.
func (ri *ResourceImporter) startWorkers(
ctx context.Context,
resources <-chan ImportableResource,
waitgroup *conc.WaitGroup,
) (chan ImportResourceResult, chan ImportError) {
successes := make(chan ImportResourceResult) // importers that have been executed successfully
failures := make(chan ImportError) // errors from importers that failed
theunrepentantgeek marked this conversation as resolved.
Show resolved Hide resolved

wg := conc.NewWaitGroup()
workersRequired := ri.desiredWorkers()
for i := 0; i < workersRequired; i++ {
wg.Go(func() {
ri.importWorker(ctx, resources, successes, failures, ri.reporter)
})
}

// Once all the workers are done, close the channels
waitgroup.Go(func() {
wg.Wait()
close(successes)
close(failures)
})

return successes, failures
}

// importerWorker is a goroutine for importing resources.
Expand All @@ -207,7 +242,6 @@ func (ri *ResourceImporter) importWorker(
completed chan<- ImportResourceResult,
failed chan<- ImportError,
progress importreporter.Interface,
done chan<- struct{},
theunrepentantgeek marked this conversation as resolved.
Show resolved Hide resolved
) {
for rsrc := range pending {
if ctx.Err() != nil {
Expand All @@ -225,68 +259,78 @@ func (ri *ResourceImporter) importWorker(
completed <- imported
}
}

done <- struct{}{}
}

func (ri *ResourceImporter) collateResults(
// startCollationOfResults starts a goroutine that will collate the results of imports.
// completed is the channel of completed imports to drain.
// candidates is the channel that receives any new resources to import.
// watchdog is updated to track the number of inflight operations.
// waitgroup is used to track running goroutines.
// report is the report to write to.
func (ri *ResourceImporter) startCollationOfResults(
completed <-chan ImportResourceResult, // completed imports for us to collate
candidates chan<- ImportableResource, // additional candidates for importing
progress importreporter.Interface, // importreporter tracking
watchdog *watchdog,
waitgroup *conc.WaitGroup,
report *resourceImportReport, // report to write to
done chan<- struct{}, // channel to signal completion
) {
for importResult := range completed {
rsrc := importResult.resource
gk := rsrc.GroupKind()

// Enqueue any child resources we found; these will be marked as completed when we collate their results
progress.AddPending(len(importResult.pending))
for _, p := range importResult.pending {
candidates <- p
}

ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())
waitgroup.Go(func() {
for importResult := range completed {
rsrc := importResult.resource
gk := rsrc.GroupKind()

// Enqueue any child resources we found; these will be marked as completed when we collate their results
ri.reporter.AddPending(len(importResult.pending))
for _, p := range importResult.pending {
candidates <- p
}

report.AddSuccessfulImport(gk)
ri.imported[rsrc.ID()] = rsrc
ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

// Flag the main resource as complete
// We do this after everything else because it might indicate we're finished
progress.Completed(1)
}
report.AddSuccessfulImport(gk)
ri.imported[rsrc.ID()] = rsrc

done <- struct{}{}
// Flag the main resource as complete
// We do this after everything else because it might indicate we're finished
ri.reporter.Completed(1)
watchdog.stopped()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put these into a method on ResourceImporter and then call it from both the success and error workers. Tying them together makes it harder to forget to do it in one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
})
}

func (ri *ResourceImporter) collateErrors(
// startCollationOfErrors starts a goroutine that will collage all the errors that occurred during import.
func (ri *ResourceImporter) startCollationOfErrors(
failures <-chan ImportError,
watchdog *watchdog,
waitgroup *conc.WaitGroup,
report *resourceImportReport,
done chan<- struct{},
) {
for ie := range failures {
var skipped *SkippedError
if eris.As(ie.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", ie.gk,
"name", ie.name,
"because", skipped.Because)
report.AddSkippedImport(ie.gk, skipped.Because)
} else {
ri.log.Error(ie.err,
"Failed",
"kind", ie.gk,
"name", ie.name)
waitgroup.Go(func() {
for ie := range failures {
var skipped *SkippedError
if eris.As(ie.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", ie.gk,
"name", ie.name,
"because", skipped.Because)
report.AddSkippedImport(ie.gk, skipped.Because)
} else {
ri.log.Error(ie.err,
"Failed",
"kind", ie.gk,
"name", ie.name)

report.AddFailedImport(ie.gk, ie.err.Error())
}
}
report.AddFailedImport(ie.gk, ie.err.Error())
}

done <- struct{}{}
ri.reporter.Completed(1)
watchdog.stopped()
}
})
}

func (ri *ResourceImporter) importResource(
Expand Down
Loading