Skip to content

Commit

Permalink
feat: Delete resources in parallel (#626)
Browse files Browse the repository at this point in the history
Delete subscriptions in the same request simultaneously.
Delete CR from the same subscription simultaneously.
  • Loading branch information
horis233 authored Mar 16, 2021
1 parent d820a46 commit 4457648
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 17 deletions.
71 changes: 57 additions & 14 deletions controllers/operandrequest/reconcile_operand.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
Expand Down Expand Up @@ -286,6 +287,10 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph
}

merr := &util.MultiErr{}
var (
mu sync.Mutex
wg sync.WaitGroup
)
for index, opdMember := range customeResourceMap {
crShouldBeDeleted := unstructured.Unstructured{
Object: map[string]interface{}{
Expand All @@ -296,12 +301,27 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph
},
},
}
if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil {
merr.Add(err)
}
operatorName := strings.Split(index, "/")[0]
requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind)

var (
operatorName = strings.Split(index, "/")[0]
opdMember = opdMember
)

wg.Add(1)
go func() {
defer wg.Done()
if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil {
mu.Lock()
defer mu.Unlock()
merr.Add(err)
}
mu.Lock()
defer mu.Unlock()
requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind)
}()
}
wg.Wait()

if len(merr.Errors) != 0 {
return merr
}
Expand Down Expand Up @@ -350,16 +370,22 @@ func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alph
continue
}
if checkLabel(unstruct, map[string]string{constant.OpreqLabel: "true"}) {
err := r.deleteCustomResource(ctx, unstruct, namespace)
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := r.deleteCustomResource(ctx, unstruct, namespace); err != nil {
mu.Lock()
defer mu.Unlock()
merr.Add(err)
}
}()
}

}

}
}
wg.Wait()
if len(merr.Errors) != 0 {
return merr
}
Expand Down Expand Up @@ -591,6 +617,11 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o
}
}

var (
mu sync.Mutex
wg sync.WaitGroup
)

merr := &util.MultiErr{}
for index, opdMember := range customeResourceMap {
crShouldBeDeleted := unstructured.Unstructured{
Expand All @@ -602,12 +633,24 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o
},
},
}
if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil {
merr.Add(err)
}
operatorName := strings.Split(index, "/")[0]
requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind)

var (
operatorName = strings.Split(index, "/")[0]
opdMember = opdMember
)
wg.Add(1)
go func() {
if err := r.deleteCustomResource(ctx, crShouldBeDeleted, requestInstance.Namespace); err != nil {
mu.Lock()
defer mu.Unlock()
merr.Add(err)
}
mu.Lock()
defer mu.Unlock()
requestInstance.RemoveMemberCRStatus(operatorName, opdMember.Name, opdMember.Kind)
}()
}
wg.Wait()

if len(merr.Errors) != 0 {
return merr
Expand Down
23 changes: 20 additions & 3 deletions controllers/operandrequest/reconcile_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

gset "github.com/deckarep/golang-set"
olmv1 "github.com/operator-framework/api/pkg/operators/v1"
Expand Down Expand Up @@ -265,6 +266,12 @@ func (r *Reconciler) absentOperatorsAndOperands(ctx context.Context, requestInst
if err != nil {
return err
}

var (
mu sync.Mutex
wg sync.WaitGroup
)

for _, req := range requestInstance.Spec.Requests {
registryKey := requestInstance.GetRegistryKey(req)
registryInstance, err := r.GetOperandRegistry(ctx, registryKey)
Expand All @@ -277,10 +284,20 @@ func (r *Reconciler) absentOperatorsAndOperands(ctx context.Context, requestInst
}
merr := &util.MultiErr{}
for o := range needDeletedOperands.Iter() {
if err := r.deleteSubscription(ctx, fmt.Sprintf("%v", o), requestInstance, registryInstance, configInstance); err != nil {
merr.Add(err)
}
var (
o = o
)
wg.Add(1)
go func() {
defer wg.Done()
if err := r.deleteSubscription(ctx, fmt.Sprintf("%v", o), requestInstance, registryInstance, configInstance); err != nil {
mu.Lock()
defer mu.Unlock()
merr.Add(err)
}
}()
}
wg.Wait()
if len(merr.Errors) != 0 {
return merr
}
Expand Down

0 comments on commit 4457648

Please sign in to comment.