diff --git a/v2/cmd/asoctl/go.mod b/v2/cmd/asoctl/go.mod index b831b2dd425..cbdf045731f 100644 --- a/v2/cmd/asoctl/go.mod +++ b/v2/cmd/asoctl/go.mod @@ -117,6 +117,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/samber/lo v1.47.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/v2/cmd/asoctl/go.sum b/v2/cmd/asoctl/go.sum index a14ed861c07..14ae280e97d 100644 --- a/v2/cmd/asoctl/go.sum +++ b/v2/cmd/asoctl/go.sum @@ -253,6 +253,8 @@ github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWR github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= diff --git a/v2/cmd/asoctl/pkg/importresources/resource_importer.go b/v2/cmd/asoctl/pkg/importresources/resource_importer.go index 796a2176a3b..ed47a46771a 100644 --- a/v2/cmd/asoctl/pkg/importresources/resource_importer.go +++ b/v2/cmd/asoctl/pkg/importresources/resource_importer.go @@ -11,6 +11,8 @@ import ( "github.com/go-logr/logr" "github.com/rotisserie/eris" + "github.com/sourcegraph/conc" + "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/runtime" "github.com/Azure/azure-service-operator/v2/cmd/asoctl/pkg/importreporter" @@ -81,35 +83,25 @@ func (ri *ResourceImporter) Import( ctx context.Context, done chan struct{}, ) (*Result, error) { - workersRequired := ri.desiredWorkers() - - candidates := make(chan ImportableResource) // candidates that need to be deduped - pending := make(chan ImportableResource) // importers that are pending import - successes := make(chan ImportResourceResult) // importers that have been executed successfully - failures := make(chan ImportError) // errors from importers that failed - completions := make(chan struct{}) // channel to signal completion + resources := make(chan ImportableResource) // resources to import + allstages := conc.NewWaitGroup() - // Dedupe candidates so we import each distinct resource only once - go ri.queueUniqueImporters(candidates, pending, ri.reporter) + watchdog := ri.startWatchdog(resources) - // Create workers to run the import - for i := 0; i < workersRequired; i++ { - go ri.importWorker(ctx, pending, successes, failures, ri.reporter, completions) - } + pendingResources := ri.startDeduplicator(resources, watchdog, allstages) + successes, failures := ri.startWorkers(ctx, pendingResources, allstages) - // Collate the results report := newResourceImportReport() - go ri.collateResults(successes, candidates, ri.reporter, report, completions) - go ri.collateErrors(failures, report, completions) + ri.startCollationOfResults(successes, resources, watchdog, allstages, report) + ri.startCollationOfErrors(failures, watchdog, allstages, report) // Set up by adding our initial resources; these will be completed when we collate their results for _, rsrc := range ri.resources { - candidates <- rsrc + resources <- rsrc ri.reporter.AddPending(1) } - // Wait while everything runs - <-done + allstages.Wait() // Check for an abort, and return the error if so if ctx.Err() != nil { @@ -117,80 +109,123 @@ func (ri *ResourceImporter) Import( return nil, ctx.Err() } - // Close channels so final reporting and other cleanup occurs - close(candidates) - close(pending) - close(successes) - - // Wait for everything to finish - for i := 0; i < workersRequired+2; i++ { - <-completions - } - // Get the summary report and write it report.WriteToLog(ri.log) // Now we've imported everything, return the resources // We do this even if there's an error so that we can return partial results - resources := make([]ImportedResource, 0, len(ri.imported)) - for _, imported := range ri.imported { - resources = append(resources, imported) - } - return &Result{ - imported: resources, + imported: maps.Values(ri.imported), }, nil } +// startWatchdog starts a watchdog goroutine that will initiate shutdown once all imports are complete. +// We keep track of the number of inflight imports, and once that reaches zero, we close the channel. +func (ri *ResourceImporter) startWatchdog( + resources chan ImportableResource, +) *watchdog { + watchdog := newWatchdog(func() { + close(resources) + }) + + return watchdog +} + +// startDeduplicator starts a deduplicator goroutine that will ensure each distinct resource is only imported once, +// regardless of how many times we encounter it. We also proactively buffer the pending channel to avoid blocking. +// Any fixed size channel would risk deadlock for a sufficiently large resource graph. +// resources is the channel of resources to deduplicate. +// watchdog is updated to track the number of inflight operations. We increase this every time we see a unique new resource. +// progress is used to report on progress as we go. +// Returns a new channel that will contain only unique resources. // queueUniqueImporters reads from the candidates channel, putting each importer onto pending exactly once. -// This ensures each distinct resource is only imported once, regardless of how many times we -// encounter it. We also proactively buffer the pending channel to avoid blocking. Any fixed size channel -// would risk deadlock for a sufficiently large resource graph. -func (ri *ResourceImporter) queueUniqueImporters( - candidates <-chan ImportableResource, - pending chan<- ImportableResource, - progress importreporter.Interface, -) { +// This ensures each distinct resource is only imported once, +func (ri *ResourceImporter) startDeduplicator( + resources <-chan ImportableResource, + watchdog *watchdog, + waitgroup *conc.WaitGroup, +) chan ImportableResource { seen := set.Make[string]() var queue []ImportableResource var current ImportableResource = nil - running := true - for running { - // Dequeue from our internal buffer if needed - if current == nil && len(queue) > 0 { - current = queue[0] - queue = queue[1:] - } + uniqueResources := make(chan ImportableResource) - // If we have a current importable to send, use the pending queue - var upstream chan<- ImportableResource = nil - if current != nil { - upstream = pending - } + // Close the channel when we're done, so that workers shut down too + defer close(uniqueResources) - select { - case rsrc, ok := <-candidates: - if !ok { - // Channel closed - running = false - } else if seen.Contains(rsrc.ID()) { - // We've already seen this resource (we've already queued it for import) - // So remove it from our count of work to be done - ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID()) - progress.AddPending(-1) - } else { - // Remember we've seen this, and add it to our queue - seen.Add(rsrc.ID()) - queue = append(queue, rsrc) - ri.log.V(2).Info("Buffering import", "resource", rsrc.ID()) + waitgroup.Go(func() { + run: + for { + // Dequeue from our internal buffer if needed + if current == nil && len(queue) > 0 { + current = queue[0] + queue = queue[1:] + } + + // If we have a current importable to send, use the pending queue + var upstream chan<- ImportableResource = nil + if current != nil { + upstream = uniqueResources + } + + select { + case rsrc, ok := <-resources: + if !ok { + // Channel closed + break run + } else if seen.Contains(rsrc.ID()) { + // We've already seen this resource (we've already queued it for import) + // So remove it from our count of work to be done + ri.log.V(2).Info("Skipping duplicate import", "resource", rsrc.ID()) + ri.reporter.AddPending(-1) + } else { + // Remember we've seen this, and add it to our queue + watchdog.starting() + seen.Add(rsrc.ID()) + queue = append(queue, rsrc) + ri.log.V(2).Info("Buffering import", "resource", rsrc.ID()) + } + case upstream <- current: + // We've sent the current importable, so clear it + ri.log.V(2).Info("Queued import", "resource", current.ID()) + current = nil } - case upstream <- current: - // We've sent the current importable, so clear it - ri.log.V(2).Info("Queued import", "resource", current.ID()) - current = nil } + }) + + return uniqueResources +} + +// startWorkers starts the worker goroutines that will import resources. +// ctx is used to check for cancellation. +// resources is the channel of unique resources to import. +// waitgroup is used to track the number of inflight goroutines. +// returns two channels: one for successful imports, and one for errors. +func (ri *ResourceImporter) startWorkers( + ctx context.Context, + resources <-chan ImportableResource, + waitgroup *conc.WaitGroup, +) (chan ImportResourceResult, chan ImportError) { + successes := make(chan ImportResourceResult) // importers that have been executed successfully + failures := make(chan ImportError) // errors from importers that failed + + wg := conc.NewWaitGroup() + workersRequired := ri.desiredWorkers() + for i := 0; i < workersRequired; i++ { + wg.Go(func() { + ri.importWorker(ctx, resources, successes, failures, ri.reporter) + }) } + + // Once all the workers are done, close the channels + waitgroup.Go(func() { + wg.Wait() + close(successes) + close(failures) + }) + + return successes, failures } // importerWorker is a goroutine for importing resources. @@ -200,14 +235,12 @@ func (ri *ResourceImporter) queueUniqueImporters( // pending is a source of resources to import. // completed is where we send the result of a successful import. // failed is where we send the error from a failed import. -// done is a channel we signal when we're finished. func (ri *ResourceImporter) importWorker( ctx context.Context, pending <-chan ImportableResource, completed chan<- ImportResourceResult, failed chan<- ImportError, progress importreporter.Interface, - done chan<- struct{}, ) { for rsrc := range pending { if ctx.Err() != nil { @@ -225,68 +258,81 @@ func (ri *ResourceImporter) importWorker( completed <- imported } } - - done <- struct{}{} } -func (ri *ResourceImporter) collateResults( +// startCollationOfResults starts a goroutine that will collate the results of imports. +// completed is the channel of completed imports to drain. +// candidates is the channel that receives any new resources to import. +// watchdog is updated to track the number of inflight operations. +// waitgroup is used to track running goroutines. +// report is the report to write to. +func (ri *ResourceImporter) startCollationOfResults( completed <-chan ImportResourceResult, // completed imports for us to collate candidates chan<- ImportableResource, // additional candidates for importing - progress importreporter.Interface, // importreporter tracking + watchdog *watchdog, + waitgroup *conc.WaitGroup, report *resourceImportReport, // report to write to - done chan<- struct{}, // channel to signal completion ) { - for importResult := range completed { - rsrc := importResult.resource - gk := rsrc.GroupKind() - - // Enqueue any child resources we found; these will be marked as completed when we collate their results - progress.AddPending(len(importResult.pending)) - for _, p := range importResult.pending { - candidates <- p - } - - ri.log.Info( - "Imported", - "kind", gk, - "name", rsrc.Name()) + waitgroup.Go(func() { + for importResult := range completed { + rsrc := importResult.resource + gk := rsrc.GroupKind() + + // Enqueue any child resources we found; these will be marked as completed when we collate their results + ri.reporter.AddPending(len(importResult.pending)) + for _, p := range importResult.pending { + candidates <- p + } - report.AddSuccessfulImport(gk) - ri.imported[rsrc.ID()] = rsrc + ri.log.Info( + "Imported", + "kind", gk, + "name", rsrc.Name()) - // Flag the main resource as complete - // We do this after everything else because it might indicate we're finished - progress.Completed(1) - } + report.AddSuccessfulImport(gk) + ri.imported[rsrc.ID()] = rsrc - done <- struct{}{} + ri.completed(watchdog) + } + }) } -func (ri *ResourceImporter) collateErrors( +// startCollationOfErrors starts a goroutine that will collage all the errors that occurred during import. +func (ri *ResourceImporter) startCollationOfErrors( failures <-chan ImportError, + watchdog *watchdog, + waitgroup *conc.WaitGroup, report *resourceImportReport, - done chan<- struct{}, ) { - for ie := range failures { - var skipped *SkippedError - if eris.As(ie.err, &skipped) { - ri.log.V(1).Info( - "Skipped", - "kind", ie.gk, - "name", ie.name, - "because", skipped.Because) - report.AddSkippedImport(ie.gk, skipped.Because) - } else { - ri.log.Error(ie.err, - "Failed", - "kind", ie.gk, - "name", ie.name) + waitgroup.Go(func() { + for ie := range failures { + var skipped *SkippedError + if eris.As(ie.err, &skipped) { + ri.log.V(1).Info( + "Skipped", + "kind", ie.gk, + "name", ie.name, + "because", skipped.Because) + report.AddSkippedImport(ie.gk, skipped.Because) + } else { + ri.log.Error(ie.err, + "Failed", + "kind", ie.gk, + "name", ie.name) + + report.AddFailedImport(ie.gk, ie.err.Error()) + } - report.AddFailedImport(ie.gk, ie.err.Error()) + ri.completed(watchdog) } - } + }) +} - done <- struct{}{} +// completed is used to indicate a resource has been fully processed. +// We do this after everything else because it might indicate we're completed the entire process. +func (ri *ResourceImporter) completed(watchdog *watchdog) { + ri.reporter.Completed(1) + watchdog.stopped() } func (ri *ResourceImporter) importResource( diff --git a/v2/cmd/asoctl/pkg/importresources/watchdog.go b/v2/cmd/asoctl/pkg/importresources/watchdog.go new file mode 100644 index 00000000000..9ec75066177 --- /dev/null +++ b/v2/cmd/asoctl/pkg/importresources/watchdog.go @@ -0,0 +1,39 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT license. + */ + +package importresources + +import ( + "sync/atomic" +) + +// watchdog keeps track of the number of inflight operations and triggers an action when they all complete. +type watchdog struct { + inflight atomic.Int32 + action func() +} + +// newWatchdog returns a new watchdog instance +func newWatchdog( + action func(), +) *watchdog { + return &watchdog{ + action: action, + } +} + +// starting increments the number of inflight operations +func (w *watchdog) starting() { + w.inflight.Add(1) +} + +// stopped decrements the number of inflight operations and triggers the action if there are no +// remaining inflight operations +func (w *watchdog) stopped() { + count := w.inflight.Add(-1) + if count == 0 { + w.action() + } +}