From 4457648cde727d310cb31ffb486e767b3580b92f Mon Sep 17 00:00:00 2001 From: Jiaming Hu Date: Tue, 16 Mar 2021 15:42:43 -0400 Subject: [PATCH] feat: Delete resources in parallel (#626) Delete subscriptions in the same request simultaneously. Delete CR from the same subscription simultaneously. --- .../operandrequest/reconcile_operand.go | 71 +++++++++++++++---- .../operandrequest/reconcile_operator.go | 23 +++++- 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/controllers/operandrequest/reconcile_operand.go b/controllers/operandrequest/reconcile_operand.go index 4ea463ed..3f57de03 100644 --- a/controllers/operandrequest/reconcile_operand.go +++ b/controllers/operandrequest/reconcile_operand.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" @@ -286,6 +287,10 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph } merr := &util.MultiErr{} + var ( + mu sync.Mutex + wg sync.WaitGroup + ) for index, opdMember := range customeResourceMap { crShouldBeDeleted := unstructured.Unstructured{ Object: map[string]interface{}{ @@ -296,12 +301,27 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph }, }, } - if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil { - merr.Add(err) - } - operatorName := strings.Split(index, "/")[0] - requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind) + + var ( + operatorName = strings.Split(index, "/")[0] + opdMember = opdMember + ) + + wg.Add(1) + go func() { + defer wg.Done() + if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil { + mu.Lock() + defer mu.Unlock() + merr.Add(err) + } + mu.Lock() + defer mu.Unlock() + requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind) + }() } + wg.Wait() + if len(merr.Errors) != 0 { return merr } @@ -350,16 +370,22 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph continue } if checkLabel(unstruct, map[string]string{constant.OpreqLabel: "true"}) { - err := r.deleteCustomResource(ctx, unstruct, namespace) - if err != nil { - return err - } + wg.Add(1) + go func() { + defer wg.Done() + if err := r.deleteCustomResource(ctx, unstruct, namespace); err != nil { + mu.Lock() + defer mu.Unlock() + merr.Add(err) + } + }() } } } } + wg.Wait() if len(merr.Errors) != 0 { return merr } @@ -591,6 +617,11 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o } } + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + merr := &util.MultiErr{} for index, opdMember := range customeResourceMap { crShouldBeDeleted := unstructured.Unstructured{ @@ -602,12 +633,24 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o }, }, } - if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil { - merr.Add(err) - } - operatorName := strings.Split(index, "/")[0] - requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind) + + var ( + operatorName = strings.Split(index, "/")[0] + opdMember = opdMember + ) + wg.Add(1) + go func() { + if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil { + mu.Lock() + defer mu.Unlock() + merr.Add(err) + } + mu.Lock() + defer mu.Unlock() + requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind) + }() } + wg.Wait() if len(merr.Errors) != 0 { return merr diff --git a/controllers/operandrequest/reconcile_operator.go b/controllers/operandrequest/reconcile_operator.go index 2a6d4b4a..5535a530 100644 --- a/controllers/operandrequest/reconcile_operator.go +++ b/controllers/operandrequest/reconcile_operator.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "sync" gset "github.com/deckarep/golang-set" olmv1 "github.com/operator-framework/api/pkg/operators/v1" @@ -265,6 +266,12 @@ func (r *Reconciler) absentOperatorsAndOperands(ctx context.Context, requestInst if err != nil { return err } + + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + for _, req := range requestInstance.Spec.Requests { registryKey := requestInstance.GetRegistryKey(req) registryInstance, err := r.GetOperandRegistry(ctx, registryKey) @@ -277,10 +284,20 @@ func (r *Reconciler) absentOperatorsAndOperands(ctx context.Context, requestInst } merr := &util.MultiErr{} for o := range needDeletedOperands.Iter() { - if err := r.deleteSubscription(ctx, fmt.Sprintf("%v", o), requestInstance, registryInstance, configInstance); err != nil { - merr.Add(err) - } + var ( + o = o + ) + wg.Add(1) + go func() { + defer wg.Done() + if err := r.deleteSubscription(ctx, fmt.Sprintf("%v", o), requestInstance, registryInstance, configInstance); err != nil { + mu.Lock() + defer mu.Unlock() + merr.Add(err) + } + }() } + wg.Wait() if len(merr.Errors) != 0 { return merr }