Skip to content

Commit

Permalink
Add composite error to async jobs (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilKabakchiev authored and DimitarPetrov committed Dec 6, 2019
1 parent 97cbd4c commit e4934f7
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 53 deletions.
12 changes: 6 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

[[constraint]]
name = "github.com/Peripli/service-broker-proxy"
version = "=0.7.0"
branch = "master"

[[constraint]]
name = "github.com/cloudfoundry-community/go-cfenv"
Expand Down
22 changes: 6 additions & 16 deletions cf/service_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"fmt"
"net/http"
"net/url"
"strings"

"github.com/Peripli/service-broker-proxy/pkg/sbproxy/reconcile"

"github.com/Peripli/service-broker-proxy/pkg/platform"

Expand All @@ -33,19 +34,7 @@ func (pc *PlatformClient) DisableAccessForPlan(ctx context.Context, request *pla
return pc.updateAccessForPlan(ctx, request, false)
}

type compositeError []error

func (ce compositeError) Error() string {
errs := make([]string, 0, len(ce))
for _, e := range ce {
errs = append(errs, fmt.Sprintln(e.Error()))
}

return strings.Join(errs, "")
}

func (pc *PlatformClient) updateAccessForPlan(ctx context.Context, request *platform.ModifyPlanAccessRequest, isEnabled bool) error {
var compositeErr compositeError

if request == nil {
return errors.Errorf("modify plan access request cannot be nil")
Expand All @@ -56,14 +45,15 @@ func (pc *PlatformClient) updateAccessForPlan(ctx context.Context, request *plat
return err
}

compositeErr := &reconcile.CompositeError{}
if orgGUIDs, ok := request.Labels[OrgLabelKey]; ok && len(orgGUIDs) != 0 {
for _, orgGUID := range orgGUIDs {
if err := pc.updateOrgVisibilityForPlan(ctx, plan, isEnabled, orgGUID); err != nil {
compositeErr = append(compositeErr, err)
compositeErr.Add(err)
}
}
if compositeErr != nil {
return errors.Wrapf(compositeErr, "error while updating access for catalog plan with id %s; %d errors occurred: %s", request.CatalogPlanID, len(compositeErr), compositeErr)
if compositeErr.Len() != 0 {
return errors.Wrapf(compositeErr, "error while updating access for catalog plan with id %s; %d errors occurred: %s", request.CatalogPlanID, compositeErr.Len(), compositeErr)
}
} else {
if err := pc.updatePlan(plan, isEnabled); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cf/service_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ var _ = Describe("Client Service Plan Access", func() {
getPlansRoute.reaction.Code = ccResponseErrCode
})

It("returns an error", assertFunc(&orgData, &planGUID, &brokerGUID, &ccResponseErrBody))
It("returns an error", assertFunc(&orgData, &planGUID, &brokerGUID, fmt.Errorf(ccResponseErrBody.Description)))
})

Context("when no plan is found", func() {
Expand Down
47 changes: 18 additions & 29 deletions cf/service_visibilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"sync"

"github.com/Peripli/service-broker-proxy/pkg/sbproxy/reconcile"

"github.com/Peripli/service-manager/pkg/log"

"github.com/pkg/errors"
Expand Down Expand Up @@ -113,7 +115,7 @@ func (pc *PlatformClient) GetVisibilitiesByBrokers(ctx context.Context, brokerNa
}

func (pc *PlatformClient) getBrokersByName(ctx context.Context, names []string) ([]cfclient.ServiceBroker, error) {
var errorOccured error
errorOccured := &reconcile.CompositeError{}
var mutex sync.Mutex
var wg sync.WaitGroup
wgLimitChannel := make(chan struct{}, pc.settings.Reconcile.MaxParallelRequests)
Expand All @@ -138,27 +140,24 @@ func (pc *PlatformClient) getBrokersByName(ctx context.Context, names []string)
query := queryBuilder{}
query.set("name", brokerNames)
brokers, err := pc.client.ListServiceBrokersByQuery(query.build(ctx))

mutex.Lock()
defer mutex.Unlock()
if err != nil {
if errorOccured == nil {
errorOccured = err
}
} else if errorOccured == nil {
errorOccured.Add(err)
} else if errorOccured.Len() == 0 {
result = append(result, brokers...)
}
}(chunk)
}
wg.Wait()
if errorOccured != nil {
if errorOccured.Len() != 0 {
return nil, errorOccured
}
return result, nil
}

func (pc *PlatformClient) getServicesByBrokers(ctx context.Context, brokers []cfclient.ServiceBroker) ([]cfclient.Service, error) {
var errorOccured error
errorOccured := &reconcile.CompositeError{}
var mutex sync.Mutex
var wg sync.WaitGroup
wgLimitChannel := make(chan struct{}, pc.settings.Reconcile.MaxParallelRequests)
Expand All @@ -183,20 +182,17 @@ func (pc *PlatformClient) getServicesByBrokers(ctx context.Context, brokers []cf
brokerGUIDs = append(brokerGUIDs, broker.Guid)
}
brokers, err := pc.getServicesByBrokerGUIDs(ctx, brokerGUIDs)

mutex.Lock()
defer mutex.Unlock()
if err != nil {
if errorOccured == nil {
errorOccured = err
}
} else if errorOccured == nil {
errorOccured.Add(err)
} else if errorOccured.Len() == 0 {
result = append(result, brokers...)
}
}(chunk)
}
wg.Wait()
if errorOccured != nil {
if errorOccured.Len() != 0 {
return nil, errorOccured
}
return result, nil
Expand All @@ -209,7 +205,7 @@ func (pc *PlatformClient) getServicesByBrokerGUIDs(ctx context.Context, brokerGU
}

func (pc *PlatformClient) getPlansByServices(ctx context.Context, services []cfclient.Service) ([]cfclient.ServicePlan, error) {
var errorOccured error
errorOccured := &reconcile.CompositeError{}
var mutex sync.Mutex
var wg sync.WaitGroup
wgLimitChannel := make(chan struct{}, pc.settings.Reconcile.MaxParallelRequests)
Expand All @@ -234,20 +230,17 @@ func (pc *PlatformClient) getPlansByServices(ctx context.Context, services []cfc
serviceGUIDs = append(serviceGUIDs, service.Guid)
}
plans, err := pc.getPlansByServiceGUIDs(ctx, serviceGUIDs)

mutex.Lock()
defer mutex.Unlock()
if err != nil {
if errorOccured == nil {
errorOccured = err
}
} else if errorOccured == nil {
errorOccured.Add(err)
} else if errorOccured.Len() == 0 {
result = append(result, plans...)
}
}(chunk)
}
wg.Wait()
if errorOccured != nil {
if errorOccured.Len() != 0 {
return nil, errorOccured
}
return result, nil
Expand All @@ -261,7 +254,7 @@ func (pc *PlatformClient) getPlansByServiceGUIDs(ctx context.Context, serviceGUI

func (pc *PlatformClient) getPlansVisibilities(ctx context.Context, plans []cfclient.ServicePlan) ([]cfclient.ServicePlanVisibility, error) {
var result []cfclient.ServicePlanVisibility
var errorOccured error
errorOccured := &reconcile.CompositeError{}
var wg sync.WaitGroup
var mutex sync.Mutex
wgLimitChannel := make(chan struct{}, pc.settings.Reconcile.MaxParallelRequests)
Expand All @@ -286,21 +279,17 @@ func (pc *PlatformClient) getPlansVisibilities(ctx context.Context, plans []cfcl
plansGUID = append(plansGUID, p.Guid)
}
visibilities, err := pc.getPlanVisibilitiesByPlanGUID(ctx, plansGUID)

mutex.Lock()
defer mutex.Unlock()

if err != nil {
if errorOccured == nil {
errorOccured = err
}
} else if errorOccured == nil {
errorOccured.Add(err)
} else if errorOccured.Len() == 0 {
result = append(result, visibilities...)
}
}(chunk)
}
wg.Wait()
if errorOccured != nil {
if errorOccured.Len() != 0 {
return nil, errorOccured
}
return result, nil
Expand Down

0 comments on commit e4934f7

Please sign in to comment.